package com.sohu3;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;

import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;

/**
 * 监测keyword文件生成适配器
 * @author sks
 *
 */
public class KeywordFileAdapter extends FileAlterationListenerAdaptor {
	private String topic;
	private Producer<String, String> producer;

	public KeywordFileAdapter(String topic, Producer<String, String> producer) {
		this.topic = topic;
		this.producer = producer;
	}

	@Override
	public void onFileCreate(File file) {
		BufferedReader br = null;
		try {
			br = new BufferedReader(new FileReader(file)); 
			Random rnd = new Random();
			String line = null;
			while ((line = br.readLine()) != null) {
				Integer key = rnd.nextInt(3);
				System.out.println("for the topic: " + topic + "---send message :" + line);
				producer.send(new KeyedMessage<String, String>(topic, String.valueOf(key), line));
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (br != null) {
				try {
					br.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}

}
